{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "4f3aebde-2527-4e9a-8121-a54acfc401c4",
   "metadata": {},
   "source": [
    "https://www.unite.ai/asynchronous-llm-api-calls-in-python-a-comprehensive-guide/\n",
    "- A guide to make asynchronous calls to OpenAI API with rate limiting.\n",
    "- Backoff is a python library that provides decorators that can used to wrap a function and retry until a specified condition is met."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "78eca8d0-6984-4d09-8481-b61037da20f5",
   "metadata": {},
   "source": [
    "### Asyncio"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c6151053-9157-46c6-bac9-2026f2274cd9",
   "metadata": {},
   "source": [
    "- Asyncio is a Python library that provides asynchronous I/O.\n",
    "    - 协程（coroutine）：使用async def定义的函数\n",
    "        - 可以在执行过程中暂停和恢复\n",
    "        - 必须使用await来调用\n",
    "    - await 关键字\n",
    "        - 用于暂停协程执行，等待异步操作完成\n",
    "        - 只能在async函数内使用\n",
    "    - asyncio.run\n",
    "        - 执行 async def 定义的事件循环（event loop）\n",
    "    - `asyncio.gather()`同时处理多个异步任务\n",
    "- It relieves you of the burden of worrying about **locks and threads** when writing concurrent programs.\n",
    "- The foundation of asyncio is the idea of **coroutines**, or functions that can be interrupted and then resumed."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2efff8fe-4aa2-4164-a78c-581b1ae39359",
   "metadata": {},
   "source": [
    "- io bound vs. cpu bound\n",
    "    - I/O 操作指的是程序需要等待外部资源完成其工作的操作。这些外部资源通常比 CPU 慢得多。当程序执行 I/O 操作时，CPU 常常处于空闲等待状态。\n",
    "        - time.sleep() (同步) 和 asyncio.sleep() (异步) 通常被归类为 I/O 操作，或者更准确地说，它们表现得像 I/O 操作。\n",
    "        - 调用大型语言模型 (LLM) API 的操作绝对是 I/O 密集型 (I/O Bound) 操作。\n",
    "            - 网络通信是核心：\n",
    "                - 当你调用一个 LLM API 时，你的程序需要通过网络将你的请求（例如，prompt、参数等）发送到托管 LLM 模型的远程服务器。\n",
    "                - 然后，你的程序必须等待远程服务器处理这个请求。这个处理过程可能涉及到模型加载、计算、生成文本等，这部分是在远程服务器上发生的，而不是在你的本地机器上。\n",
    "                - 最后，远程服务器通过网络将响应（生成的文本、错误信息等）发送回你的程序。\n",
    "        - 等待时间远大于本地计算时间：\n",
    "            - 网络延迟 (Latency)：数据在互联网上传输需要时间。\n",
    "            - 服务器处理时间：LLM 模型本身可能很大，推理计算也需要时间，尤其是在高负载情况下，服务器可能还需要排队处理你的请求。\n",
    "            - 数据传输时间：请求和响应的数据量也可能影响传输时间。\n",
    "            - 在整个 API 调用过程中，你的本地程序大部分时间都花在等待网络传输完成和远程服务器响应上。本地 CPU 在这段时间内基本是空闲的，或者只做一些非常轻量级的工作（比如序列化/反序列化数据）。\n",
    "        - CPU 利用率低（本地）：\n",
    "            - 当你的代码执行 `response = await client.chat.completions.create(...)` (或者类似的同步调用 `response = client.chat.completions.create(...)`) 时，你的程序会暂停执行，等待网络操作完成。\n",
    "            - 在这段等待期间，你的本地 CPU 可以被操作系统或事件循环用于执行其他任务（如果是异步调用且有其他任务）或处于空闲状态。\n",
    "    - CPU 密集型操作指的是程序主要时间消耗在执行计算任务上，CPU 持续高速运转。\n",
    "    - CPU密集型任务不适合异步编程的原因\n",
    "        - 异步编程的优势在于I/O等待时的切换\n",
    "\n",
    "```python\n",
    "# I/O密集型任务 - 适合异步\n",
    "async def io_task():\n",
    "    await asyncio.sleep(1)  # 在等待I/O时，可以切换到其他任务\n",
    "    return \"完成\"\n",
    "\n",
    "# CPU密集型任务 - 不适合异步\n",
    "async def cpu_task():\n",
    "    result = 0\n",
    "    for i in range(1000000000):  # CPU计算，无法切换\n",
    "        result += i\n",
    "    return result\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "843061ec-e183-4ad6-bcd1-8af0948801f9",
   "metadata": {},
   "source": [
    "### OpenAI vs. AsyncOpenAI"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e25fdcae-1f13-4efc-ba4e-a89107bc6fd2",
   "metadata": {},
   "source": [
    "- `async def` 方法, 必须用 `await`\n",
    "- `非阻塞I/O`：当你调用 await client.chat.completions.create(...) 时，它会告诉 asyncio 的事件循环：“我已经发出了一个网络请求，现在需要等待响应。在我等待的时候，你可以去执行其他任务（比如发出另外一个API请求）。”\n",
    "- `并发执行`：正因为这种“等待时让出控制权”的特性，asyncio.gather(*tasks) 才能实现真正的并发。它会同时启动所有的API请求，然后等待所有请求完成。总耗时约等于最慢的那个请求的耗时，而不是所有请求耗时的总和。"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9868ffed-6e3f-4bb1-987a-df10d06b1cad",
   "metadata": {},
   "source": [
    "### Asynchronous LLM API Calls"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3e72da5d-b6a5-4873-8f9c-11c888cba418",
   "metadata": {},
   "outputs": [],
   "source": [
    "from dotenv import load_dotenv, find_dotenv\n",
    "assert load_dotenv(find_dotenv())"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d7b73859-3212-4449-a2d2-2a94ca6c887d",
   "metadata": {},
   "source": [
    "```python\n",
    "import asyncio\n",
    "import aiohttp\n",
    "from openai import AsyncOpenAI\n",
    "\n",
    "async def generate_text(prompt, client):\n",
    "    response = await client.chat.completions.create(\n",
    "        model=\"gpt-4.1-mini-2025-04-14\",\n",
    "        messages=[{\"role\": \"user\", \"content\": prompt}]\n",
    "    )\n",
    "    return response.choices[0].message.content\n",
    "    \n",
    "async def main():\n",
    "    prompts = [\n",
    "        \"Explain quantum computing in simple terms.\",\n",
    "        \"Write a haiku about artificial intelligence.\",\n",
    "        \"Describe the process of photosynthesis.\"\n",
    "    ]\n",
    "     \n",
    "    async with AsyncOpenAI() as client:\n",
    "        tasks = [generate_text(prompt, client) for prompt in prompts]\n",
    "        results = await asyncio.gather(*tasks)\n",
    "     \n",
    "    for prompt, result in zip(prompts, results):\n",
    "        print(f\"Prompt: {prompt}\\nResponse: {result}\\n\")\n",
    "        \n",
    "asyncio.run(main())\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9f13e52f-c17d-41cb-88fc-1eb11f0fff97",
   "metadata": {},
   "source": [
    "### Batching and Concurrency Control"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "77d8e98e-e215-4686-b53f-a5c9160ec66e",
   "metadata": {},
   "source": [
    "```python\n",
    "async def process_batch(batch, client):\n",
    "    responses = await asyncio.gather(*[\n",
    "        client.chat.completions.create(\n",
    "            model=\"gpt-4.1-mini-2025-04-14\",\n",
    "            messages=[{\"role\": \"user\", \"content\": prompt}]\n",
    "        ) for prompt in batch\n",
    "    ])\n",
    "    return [response.choices[0].message.content for response in responses]\n",
    "    \n",
    "async def main():\n",
    "    prompts = [f\"Tell me a fact about number {i}\" for i in range(100)]\n",
    "    batch_size = 10\n",
    "     \n",
    "    async with AsyncOpenAI() as client:\n",
    "        results = []\n",
    "        for i in range(0, len(prompts), batch_size):\n",
    "            batch = prompts[i:i+batch_size]\n",
    "            batch_results = await process_batch(batch, client)\n",
    "            results.extend(batch_results)\n",
    "     \n",
    "    for prompt, result in zip(prompts, results):\n",
    "        print(f\"Prompt: {prompt}\\nResponse: {result}\\n\")\n",
    "asyncio.run(main())\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0b84753c-b552-4cbe-8c03-6041ab50377d",
   "metadata": {},
   "source": [
    "### semaphore"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "fb59957d-e904-475a-b5b2-0891cf283e99",
   "metadata": {},
   "source": [
    "- asyncio.Semaphore：控制并发；\n",
    "    - 用于限制同时运行的协程数量，从而防止资源过载或满足某些并发限制需求。\n",
    "- asyncio.as_completed：获取实时结果；\n",
    "    - asyncio.as_completed 会在每个任务完成时生成一个迭代器，使您能够立即处理完成的任务，而不必等待所有任务结束。"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b0eb31c0-b55e-4478-8af1-07564e99a4de",
   "metadata": {},
   "source": [
    "```python\n",
    "import asyncio\n",
    "from openai import AsyncOpenAI\n",
    "\n",
    "async def generate_text(prompt, client, semaphore):\n",
    "    async with semaphore:\n",
    "        response = await client.chat.completions.create(\n",
    "            model=\"gpt-4.1-mini-2025-04-14\",\n",
    "            messages=[{\"role\": \"user\", \"content\": prompt}]\n",
    "        )\n",
    "        return response.choices[0].message.content\n",
    "        \n",
    "async def main():\n",
    "    prompts = [f\"Tell me a fact about number {i}\" for i in range(100)]\n",
    "    max_concurrent_requests = 5\n",
    "    semaphore = asyncio.Semaphore(max_concurrent_requests)\n",
    "     \n",
    "    async with AsyncOpenAI() as client:\n",
    "        tasks = [generate_text(prompt, client, semaphore) for prompt in prompts]\n",
    "        results = await asyncio.gather(*tasks)\n",
    "     \n",
    "    for prompt, result in zip(prompts, results):\n",
    "        print(f\"Prompt: {prompt}\\nResponse: {result}\\n\")\n",
    "asyncio.run(main())\n",
    "```"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
